热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

更多|本文_Springboot系列(三十二):Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Springboot系列(三十二):Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏相关的知识,希望对你有一定的参考价

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Springboot系列(三十二):Springboot集成 kafka(环境搭建+演示)|超级详细,建议收藏相关的知识,希望对你有一定的参考价值。




👨‍🎓 作者:bug菌


🎉简介:在CSDN、掘金等社区优质创作者,全网合计6w粉+,对一切技术都感兴趣,重心偏java方向,目前运营公众号[猿圈奇妙屋],欢迎小伙伴们的加入,一起秃头。


🚫特别声明:原创不易,转载请附上原文出处链接和本文声明,谢谢配合。


🙏版权声明:文章里可能部分文字或者图片来源于互联网或者百度百科,如有侵权请联系bug菌处理。







【开发云】年年都是折扣价,不用四处薅羊毛


         嗨,家人们,我是bug菌呀,我又来啦。今天我们来聊点什么咧,OK,接着为大家更《springboot零基础入门教学》系列文章吧。希望能帮助更多的初学者们快速入门!



       小伙伴们在批阅文章的过程中如果觉得文章对您有一丝丝帮助,还请别吝啬您手里的赞呀,大胆的把文章
点亮👍吧,您的点赞三连(
收藏⭐️+关注👨‍🎓+留言📃)就是对bug菌我创作道路上最好的鼓励与支持😘。时光不弃🏃🏻‍♀️,创作不停💕,加油☘️

一、前言🔥

       上一期,我是带着大家入门了springboot集成RabbitMq,今天我再来一期kafka的零基础教学吧。不知道大家对kafka有多少了解,反正我就是从搭建开始,然后再加一个简单演示,这就算是带着大家了个门哈,剩下的我再后边慢慢出教程给大家说。


二、环境说明🔥



演示环境:idea2019.3 + springboot 2.3.1REALSE + windows10 + kafka



三、概念🔥

        kafka是linkedin开源的分布式发布-订阅消息系统,目前归属于Apache的顶级项目。主要特点是基于pull模式来处理消息消费,追求高吞吐量,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。

        一开始的目的是日志的收集和传输。0.8版本开始支持复制,不支持事务,对消息的丢失,重复,错误没有严格要求 适用于产生大量数据的互联网服务的数据收集业务。在廉价的服务器上都能有很高的性能,这个主要是基于操作系统底层的pagecache,不用内存胜似使用内存。

       综上所述,kafka是一款开源的消息引擎系统(消息队列/消息中间件) 分布式流处理平台。


四、Windows安装kafka🔥


1️⃣下载kafka安装包

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz

下载完后是这么个东西:


2️⃣下载好后,进行解压


 3️⃣配置修改

        进入config目录,修改server.properties文件,把 log.dirs的值改成 ./logs


 4️⃣kafka启动

        在你的安装目录下的bin\\windows目录上直接输入cmd然后回车。

执行命令:输入

kafka-server-start.bat ../../config/server.properties

 然后可以看到控制台启动报错:

        可以看到,kafka还依赖于zookeeper。所以我们接下来再安装zookeeper,启动zookeeper后再启动kafka试试。


五、安装zookeeper🔥


1️⃣下载zookeeper安装包

        先下载zookeeper,下载地址:https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.7.0


 2️⃣解压

        下载完后进行解压,解压到指定的目录下;

        然后再将apache-zookeeper-3.7.0-bin也一并下载,进行解压完后,将目录下的lib文件夹复制到apache-zookeeper-3.7.0主目录下:

 否则后续启动肯定会报错。


 3️⃣修改配置

        修改zoo_sample.cfg 文件名(./conf) 为 zoo.cfg

        同样再编辑它指定日志位置,具体配置文件如下:(这里使用notepad++小绿本进行编辑);我这里是指定在同级目录下了。这个你根据实际情况而定。

         具体添加如下:仅供参考复制。

#原目录;直接注释掉
#dataDir=/tmp/zookeeper
#指定新目录
#保存数据的目录
dataDir=./data
# 保存日志文件的目录
dataLogDir=./log

4️⃣启动zookeeper

        进入到bin目录,并且启动zkServer.cmd,这个脚本中会启动一个java进程。

        如果你们遇到这个报错,请看上边第2点zookeeper解压。报错是因为找不到类包缺少lib这个jar包文件夹,所以你得下载bin包并把lib依赖都复制过来。

输入如下命名进行启动zookeeper

zkServer.cmd

具体启动如下:

 正常启动截图:

        再启动后jps可以看到QuorumPeerMain的进程。直接win+R 输入cmd然后再输入如下命令即可进行查询。

启动命令如下:

jps -l -v

启动客户端连接一下:

进入到/bin目录下,执行如下命令:

zkCli.cmd 127.0.0.1:2181

        如上可以看到, zookeeper启动ok。至此,zookeeper就安装完成啦。

        所以我们再来启动一下kafka看看,是否还会跟刚才一样报错。

        切记不要关zookeeper启动服务小黑窗,也就是你执行那串命令的窗口,若是关了你再重新启动即可。

zkServer.cmd

kafka启动成功截图:


六、kafka项目集成🔥


1️⃣pom引入



org.springframework.kafka
spring-kafka

2️⃣配置kafka

#配置kafka 服务器
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false

3️⃣topic初始化

package com.example.demo.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* topic初始化
*
* @author luoYong
* @version 1.0
* @date 2022/2/28 17:39
*/
@Configurationpublic class KafkaConfig
/**
* 创建一个名为topic.test的Topic并设置分区数为8,分区副本数为2
*/
@Bean public NewTopic initialTopic()
return new NewTopic("topic.test", 8, (short) 2);


4️⃣定义一个kafka消息发送端

package com.example.demo.component.kafka;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* kafka消息发送端
*
* @author luoYong
* @version 1.0
* @date 2022/2/28 17:40
*/
@Component
@Slf4j
public class KafkaProducer
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(Object obj)
String obj2String = JSONObject.toJSONString(obj);
// 发送消息
kafkaTemplate.send("topic.test", obj).addCallback(new ListenableFutureCallback>()
@Override
public void onFailure(Throwable throwable)
// 发送失败的处理
log.info("topic[] 生产者 发送消息失败[]", "topic.test", throwable.getMessage());

@Override
public void onSuccess(SendResult stringObjectSendResult)
// 成功的处理
log.info("topic[] 生产者 发送消息成功[]", "topic.test", stringObjectSendResult.getProducerRecord().value());

);


5️⃣定义一个kafka消息消费端

package com.example.demo.component.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* 消息接收端-支持多端消费
*
* @author luoYong
* @version 1.0
* @date 2022/2/28 17:42
*/
@Component@Slf4jpublic class KafkaConsumer
@KafkaListener(topics = "topic.test", groupId = "topic.group1")
public void topicTest(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
Optional message = Optional.ofNullable(record.value());
if (message.isPresent())
Object msg = message.get();
log.info("客户端 A 消费了: Topic[] Message[]", topic, msg);
ack.acknowledge();


@KafkaListener(topics = "topic.test", groupId = "topic.group1")
public void topicTest1(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
Optional message = Optional.ofNullable(record.value());
if (message.isPresent())
Object msg = message.get();
log.info("客户端 B 消费了: Topic[] Message[]", topic, msg);
ack.acknowledge();



6️⃣定义一个测试类进行测试

/**
* @author luoYong
* @version 1.0
* @date 2022/2/24 17:02
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaTest
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testSendMsg()
String msg = "hello";
kafkaProducer.send(msg);


7️⃣测试结果如下

       ... ...

       ok,以上就是我这期的全部内容啦,如果还想学习更多,你可以看看我的往期热文推荐哦,每天积累一个奇淫小知识,日积月累下去,你一定能成为令人敬仰的大佬的。好啦,咱们下期见~


七、往期推荐🔥


  • springboot系列(一):如何创建springboot项目及启动
  • springboot系列(二):yaml、properties两配置文件介绍及使用

  • springboot系列(三):多环境切换,实例演示
  • springboot系列(四):stater入门
  • springboot系列(五):史上最最最全springboot常用注解
  • springboot系列(六):mysql配置及数据库查询
  • springboot系列(七):如何通过mybatis-plus实现接口增删改查
  • springboot系列(八):mybatis-plus之条件构造器使用手册
  • springboot系列(九):mybatis-plus之如何自定义sql
  • springboot系列(十):mybatis之xml映射文件>、<=等特殊符号写法
  • springboot系列(十一):实现多数据源配置,开箱即用
  • springboot系列(十二):如何实现邮件发送提醒,你一定得会(准备篇)
  • springboot系列(十三):如何实现发送普通邮件?你一定得会
  • springboot系列(十四):如何实现发送图片、doc文档等附件邮件?你一定得会
  • springboot系列(十五):如何实现静态邮件模板发送?你一定得会
  • springboot系列(十六):如何实现发送邮件提醒,附完整源码
  • springboot系列(十七):集成在线接口文档Swagger2
  • springboot系列(十八):如何Windows安装redis?你玩过么
  • springboot系列(十九):如何集成redis?不会我教你
  • springboot系列(二十):如何通过redis实现手机号验证码功能 
  • ... ...

八、文末🔥

        如果还想要学习更多,小伙伴们可关注bug菌专门为大家创建的专栏《springboot零基础入门教学》,从无到有,从零到一!希望能帮助到更多小伙伴们。






【开发云】年年都是折扣价,不用四处薅羊毛


       我是bug菌,一名想走👣出大山改变命运的程序猿。接下来的路还很长,都等待着我们去突破、去挑战。来吧,小伙伴们,我们一起加油!未来皆可期,fighting!

        最后送大家两句我很喜欢的话,与诸君共勉!



☘️做你想做的人,没有时间限制,只要愿意,什么时候都可以start。

🍀你能从现在开始改变,也可以一成不变,这件事,没有规矩可言,你可以活出最精彩的自己。



​​​



💌如果文章对您有所帮助,就请留下您的吧!(#^.^#);


💝如果喜欢bug菌分享的文章,就请给bug菌点个关注吧!(๑′ᴗ‵๑)づ╭❤~;


💗如果对文章有任何疑问,还请文末留言或者加群吧;


💞鉴于个人经验有限,所有观点及技术研点,如有异议,请直接回复参与讨论(请勿发表攻击言论,谢谢);


💕版权声明:原创不易,转载请附上原文出处链接和本文声明,版权所有,盗版必究!!!谢谢。



推荐阅读
  • Eclipse 中 JSP 开发环境配置指南
    本文详细介绍了如何在 Eclipse 集成开发环境中配置 JSP 运行环境,包括必要的软件下载、Tomcat 服务器的配置以及常见问题的解决方法。 ... [详细]
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • 构建Filebeat-Kafka-Logstash-ElasticSearch-Kibana日志收集体系
    本文介绍了如何使用Filebeat、Kafka、Logstash、ElasticSearch和Kibana构建一个高效、可扩展的日志收集与分析系统。各组件分别承担不同的职责,确保日志数据能够被有效收集、处理、存储及可视化。 ... [详细]
  • 本文介绍了一个基于 Java SpringMVC 和 SSM 框架的综合系统,涵盖了操作日志记录、文件管理、头像编辑、权限控制、以及多种技术集成如 Shiro、Redis 等,旨在提供一个高效且功能丰富的开发平台。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • docker镜像重启_docker怎么启动镜像dock ... [详细]
  • 一面问题:MySQLRedisKafka线程算法mysql知道哪些存储引擎,它们的区别mysql索引在什么情况下会失效mysql在项目中的优化场景&# ... [详细]
  • 本文从数据埋点的设计者视角出发,全面解析数据埋点的技术原理、应用场景及其管理方法,涵盖基础知识、实施策略、数据处理流程等内容。 ... [详细]
  • 利用GitHub热门资源,成功斩获阿里、京东、腾讯三巨头Offer
    Spring框架作为Java生态系统中的重要组成部分,因其强大的功能和灵活的扩展性,被广泛应用于各种规模的企业级应用开发中。本文将通过一份在GitHub上获得极高评价的Spring全家桶文档,探讨如何掌握Spring框架及其相关技术,助力职业发展。 ... [详细]
  • Kafka 示例项目中 Log4j 的配置与调试
    本文详细介绍了如何在 Kafka 源码中的示例项目配置 Log4j,以确保能够正确记录日志信息,帮助开发者更好地理解和调试代码。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 深入解析Spring Boot自动配置机制
    本文旨在深入探讨Spring Boot的自动配置机制,特别是如何利用配置文件进行有效的设置。通过实例分析,如Http编码自动配置,我们将揭示配置项的具体作用及其背后的实现逻辑。 ... [详细]
  • 深入理解Kafka架构
    本文将详细介绍Kafka的内部工作机制,包括其工作流程、文件存储机制、生产者与消费者的具体实现,以及如何通过高效读写技术和Zookeeper支持来确保系统的高性能和稳定性。 ... [详细]
  • 58同城的Elasticsearch应用与平台构建实践
    本文由58同城高级架构师于伯伟分享,由陈树昌编辑整理,内容源自DataFunTalk。文章探讨了Elasticsearch作为分布式搜索和分析引擎的应用,特别是在58同城的实施案例,包括集群优化、典型应用实例及自动化平台建设等方面。 ... [详细]
  • 历经两个月,他成功斩获阿里巴巴Offer
    经过两个月的努力,一位普通的双非本科毕业生最终成功获得了阿里巴巴的录用通知。 ... [详细]
author-avatar
历史本轻狂
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有